store.go
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
var err error
txName := ctx.Value(txKey)
fmt.Println(txName, "create transfer")
// should convert arg (type TransferTxParams) to CreateTransferParams instead of using struct literal (gosimple)
// Because CreateTransferParams has same fields as TransferTxParams.
result.Transfer, err = q.CreateTransfer(ctx, CreateTransferParams(arg))
if err != nil {
return err
}
fmt.Println(txName, "create entry 1")
result.FromEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}
fmt.Println(txName, "create entry 2")
result.ToEntry, err = q.CreateEntry(ctx, CreateEntryParams{
AccountID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
// Implement update accounts' balance
// move money out of account1
fmt.Println(txName, "update account 1")
result.FromAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}
// move money into account2
fmt.Println(txName, "update account 2")
result.ToAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
return nil
})
return result, err
}
在上一階段已經解決了foreign key所
操成的deadlock
,並且也將*GetAccountForUpdate
和*UpdateAccount
進行了合併,減少Transaction
的執行時間,大大”降低
”了 Deadlock
的可能性。
-- name: GetAccountForUpdate :one
SELECT *
FROM accounts
WHERE id = $1
LIMIT 1
FOR NO KEY
UPDATE;
-- name: UpdateAccount :one
UPDATE accounts
set balance = $2
WHERE id = $1
RETURNING *;
-- name: AddAccountBalance :one
UPDATE accounts
SET balance = balance + sqlc.arg(amount)
WHERE id = sqlc.arg(id)
RETURNING *;
store_test.go
func TestTransferTx(t *testing.T) {
store := NewStore(testDB)
// check results
// init key value map
existed := make(map[int]bool)
// Create two accounts.
account1 := createRandomAccount(t)
account2 := createRandomAccount(t)
fmt.Println(">> before:", account1.Balance, account2.Balance)
n := 5
amount := int64(10)
// run n concurrent transfer transaction
// Simulate multiple concurrent requests in real world
errs := make(chan error)
results := make(chan TransferTxResult)
for i := 0; i < n; i++ {
txName := fmt.Sprintf("tx %d", i+1)
// 匿名函數的 goroutine
go func() {
ctx := context.WithValue(context.Background(), txKey, txName)
result, err := store.TransferTx(ctx, TransferTxParams{
FromAccountID: account1.ID,
ToAccountID: account2.ID,
Amount: amount,
})
errs <- err
results <- result
}()
}
// check results
for i := 0; i < n; i++ {
err := <-errs
require.NoError(t, err)
result := <-results
require.NotEmpty(t, result)
// check transfer
transfer := result.Transfer
require.NotEmpty(t, transfer)
require.Equal(t, account1.ID, transfer.FromAccountID)
require.Equal(t, account2.ID, transfer.ToAccountID)
require.Equal(t, amount, transfer.Amount)
require.NotZero(t, transfer.ID)
require.NotZero(t, transfer.CreatedAt)
// Need to to be sure that a transfer record is really created in the database
_, err = store.GetTransfer(context.Background(), transfer.ID)
require.NoError(t, err)
// check entries
fromEntry := result.FromEntry
require.NotEmpty(t, fromEntry)
require.Equal(t, account1.ID, fromEntry.AccountID)
require.Equal(t, -amount, fromEntry.Amount)
require.NotZero(t, fromEntry.ID)
require.NotZero(t, fromEntry.CreatedAt)
_, err = store.GetEntry(context.Background(), fromEntry.ID)
require.NoError(t, err)
toEntry := result.ToEntry
require.NotEmpty(t, toEntry)
require.Equal(t, account2.ID, toEntry.AccountID)
require.Equal(t, amount, toEntry.Amount)
require.NotZero(t, toEntry.ID)
require.NotZero(t, toEntry.CreatedAt)
_, err = store.GetEntry(context.Background(), toEntry.ID)
require.NoError(t, err)
// check accounts
fromAccount := result.FromAccount
require.NotEmpty(t, fromAccount)
require.Equal(t, account1.ID, fromAccount.ID)
toAccount := result.ToAccount
require.NotEmpty(t, toAccount)
require.Equal(t, account2.ID, toAccount.ID)
// check balances
fmt.Println(">> tx:", fromAccount.Balance, toAccount.Balance)
// check account balance
// account1.balance and account2.Balance is original balance
// fromAccount.balance and toAccount.Balance is after transfer
diff1 := account1.Balance - fromAccount.Balance
diff2 := toAccount.Balance - account2.Balance
require.Equal(t, diff1, diff2)
require.True(t, diff1 > 0)
require.True(t, diff1%amount == 0)
// 1 <= k <= n
k := int(diff1 / amount)
require.True(t, k >= 1 && k <= n)
require.NotContains(t, existed, k)
existed[k] = true
}
// check the final updated balance
updatedAccount1, err := store.GetAccountForUpdate(context.Background(), account1.ID)
require.NoError(t, err)
updatedAccount2, err := store.GetAccountForUpdate(context.Background(), account2.ID)
require.NoError(t, err)
fmt.Println(">> after:", updatedAccount1.Balance, updatedAccount2.Balance)
require.Equal(t, account1.Balance-int64(n)*amount, updatedAccount1.Balance)
require.Equal(t, account2.Balance+int64(n)*amount, updatedAccount2.Balance)
}
為何會說只是降低了可能呢? 雖然我們在unit test中模擬了多個並行的轉帳交易,但這些交易的操作順序都是一致
的從account 1
轉到account 2
(先從account 1
減少餘額,再向account 2
增加餘額),所以較不容易出現DeadLock
的情況。
但現實中會有更復雜的Transaction
,例如,同時有一個交易是從account 1
轉到account 2
,而另一個交易剛好是從account 2
轉到account 1,
這樣的情況下,如果這兩個交易的操作順序不一致,就有可能出現死鎖。(transaction1
先鎖定了account 1
,同時transaction2
也鎖定了account 2
,這兩個交易都會等待對方釋放Lock
的資源,這就會導致DeadLock
的情況。)
我們可以用以下SQL
並搭配平行處理來進行模擬與確認:
-- Tx1: transfer $10 from account 1 to account 2
BEGIN;
UPDATE accounts SET balance = balance - 10 WHERE id = 1 RETURNING *;
UPDATE accounts SET balance = balance + 10 WHERE id = 2 RETURNING *;
ROLLBACK;
-- Tx2: transfer $10 from account 2 to account 1
BEGIN;
UPDATE accounts SET balance = balance - 10 WHERE id = 2 RETURNING *;
UPDATE accounts SET balance = balance + 10 WHERE id = 1 RETURNING *;
ROLLBACK;
我們建立兩個Terminal 1
和 Terminal 2
,並都進入到simple_bank中:
docker exec -it postgres psql -U root simple_bank
接下來在Terminal 1
和 Terminal 2
各自執行第一步驟的UPDATE,這時Transaction 1
會blocked account 1
而 Transaction 1
會blocked account2
:
Termainal 1:
simple_bank=# BEGIN;
BEGIN
simple_bank=*#
simple_bank=*# UPDATE accounts SET balance = balance - 10 WHERE id = 1 RETURNING *;
id | owner | balance | currency | created_at
----+--------+---------+----------+-------------------------------
1 | tbsyig | 838 | EUR | 2023-08-17 01:32:51.623403+00
(1 row)
UPDATE 1
Termainal 2:
simple_bank=# BEGIN;
BEGIN
simple_bank=*#
simple_bank=*# UPDATE accounts SET balance = balance - 10 WHERE id = 2 RETURNING *;
id | owner | balance | currency | created_at
----+--------+---------+----------+-------------------------------
2 | shyufr | 676 | EUR | 2023-08-17 01:32:51.653722+00
(1 row)
UPDATE 1
如果Transaction 1
執行下一步驟的Update account2
就會發生Deadlock
Termainal 1:
simple_bank=# BEGIN;
BEGIN
simple_bank=*#
simple_bank=*# UPDATE accounts SET balance = balance - 10 WHERE id = 1 RETURNING *;
id | owner | balance | currency | created_at
----+--------+---------+----------+-------------------------------
1 | tbsyig | 838 | EUR | 2023-08-17 01:32:51.623403+00
(1 row)
UPDATE 1
UPDATE accounts SET balance = balance + 10 WHERE id = 2 RETURNING *;
Termainal 2:
simple_bank=# BEGIN;
BEGIN
simple_bank=*#
simple_bank=*# UPDATE accounts SET balance = balance - 10 WHERE id = 2 RETURNING *;
id | owner | balance | currency | created_at
----+--------+---------+----------+-------------------------------
2 | shyufr | 676 | EUR | 2023-08-17 01:32:51.653722+00
(1 row)
UPDATE 1
接下來我們到Table Plus 執下以下SQL Query來列出目前lock的清況,可以發現transaction 1對於update account 2
的query正在嘗試獲取一個transaction ID 800的ShareLock
,但它還沒有被授予。
SELECT
a.application_name,
l.relation::regclass,
l.transactionid,
l.mode,
l.locktype,
l.GRANTED,
a.usename,
a.query,
a.pid
FROM
pg_stat_activity a
JOIN pg_locks l ON
l.pid = a.pid
WHERE
a.application_name = 'psql'
ORDER BY
a.pid;
這是因為transaction 2
已經持有了同一個transaction ID
的ExclusiveLock
因此,transaction 1
必須等待transaction 2
完成後才能繼續。
同等如果我們transaction 2
執行update account 1
的query,我們就會收到deadlock
的通知,因為account 1
正在被transaction 1
更新,因此transaction 2
也需要等待transaction 1
完成才能獲得此查詢的結果。這兩個concurrent transactions
都需要等待對方,因此發生了死鎖:
TestTransferTxDeadlock
用來檢測deadlock
error
:
transaction
: account 1
→ account 2
(5 times), account 2
→ account 1
(5 times)deadlock
的行為,所以並不需要驗證results
transaction
後,account 1
和 account 2
的Balance
不會改變。func TestTransferTxDeadlock(t *testing.T) {
store := NewStore(testDB)
// Create two accounts.
account1 := createRandomAccount(t)
account2 := createRandomAccount(t)
fmt.Println(">> before:", account1.Balance, account2.Balance)
n := 10
amount := int64(10)
// run n concurrent transfer transaction
// Simulate multiple concurrent requests in real world
errs := make(chan error)
for i := 0; i < n; i++ {
txName := fmt.Sprintf("tx %d", i+1)
fromAccountID := account1.ID
toAccountID := account2.ID
if i%2 == 1 {
fromAccountID = account2.ID
toAccountID = account1.ID
}
// 匿名函數的 goroutine
go func() {
ctx := context.WithValue(context.Background(), txKey, txName)
_, err := store.TransferTx(ctx, TransferTxParams{
FromAccountID: fromAccountID,
ToAccountID: toAccountID,
Amount: amount,
})
errs <- err
}()
}
// check results
for i := 0; i < n; i++ {
err := <-errs
require.NoError(t, err)
}
// check the final updated balance
updatedAccount1, err := store.GetAccountForUpdate(context.Background(), account1.ID)
require.NoError(t, err)
updatedAccount2, err := store.GetAccountForUpdate(context.Background(), account2.ID)
require.NoError(t, err)
fmt.Println(">> after:", updatedAccount1.Balance, updatedAccount2.Balance)
require.Equal(t, account1.Balance, updatedAccount1.Balance)
require.Equal(t, account2.Balance, updatedAccount2.Balance)
}
transaction
順序transaction1
先鎖定了account 1
,同時transaction2
也鎖定了account 2
,這兩個交易都會等待對方釋放Lock
的資源,這就會導致DeadLock
的情況。deadlock
**,只需讓兩個交易按相同的順序更新帳戶餘額。例如,在transaction2
中,我們只需將更新account 1
的查詢提前,其他保持不變就不會發生阻塞。-- Tx1: transfer $10 from account 1 to account 2
BEGIN;
UPDATE accounts SET balance = balance - 10 WHERE id = 1 RETURNING *;
UPDATE accounts SET balance = balance + 10 WHERE id = 2 RETURNING *;
ROLLBACK;
-- Tx2: transfer $10 from account 2 to account 1
BEGIN;
UPDATE accounts SET balance = balance + 10 WHERE id = 1 RETURNING *; -- moved up
UPDATE accounts SET balance = balance - 10 WHERE id = 2 RETURNING *;
ROLLBACK;
Transaction2
的動作,需要等待Transaction1
操作完成,這樣就不會相互持有lock
:
當Transaction1
的操作完成(COMMIT
/ROLLBACK
),Transaction2
才會繼續進行:
所以我們知道防止DeadLock的最佳方式是確保應用程序總是以一致的Transaction
順序獲取Lock。
store.go
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
...
if arg.FromAccountID < arg.ToAccountID {
result.FromAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}
result.ToAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
} else {
result.ToAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: arg.ToAccountID,
Amount: arg.Amount,
})
if err != nil {
return err
}
result.FromAccount, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: arg.FromAccountID,
Amount: -arg.Amount,
})
if err != nil {
return err
}
}
return nil
})
return result, err
}
建了保持Transaction的動作一致,我們透過AccountID來判斷執行順序,但現在的function太過冗長,我們可以將整體Update Account Balance的動作抽換成單一的function來處理(addMoney
),再配合原先的判斷式來帶入不同的參數:
func addMoney(
ctx context.Context,
q *Queries,
accountID1 int64,
amount1 int64,
accountID2 int64,
amount2 int64,
) (account1 Account, account2 Account, err error) {
account1, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID1,
Amount: amount1,
})
if err != nil {
return
}
account2, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID2,
Amount: amount2,
})
return
}
func (store *Store) TransferTx(ctx context.Context, arg TransferTxParams) (TransferTxResult, error) {
var result TransferTxResult
err := store.execTx(ctx, func(q *Queries) error {
...
if arg.FromAccountID < arg.ToAccountID {
result.FromAccount, result.ToAccount, err = addMoney(ctx, q, arg.FromAccountID, -arg.Amount, arg.ToAccountID, arg.Amount)
} else {
result.ToAccount, result.FromAccount, err = addMoney(ctx, q, arg.ToAccountID, arg.Amount, arg.FromAccountID, -arg.Amount)
}
return err
})
return result, err
}
這樣的改動帶來以下幾個好處:
addMoney
中,可以在其他地方更容易地重用這段代碼,減少代碼重複。TransferTx
函數中直接執行轉帳操作,這使得這個函數變得相對冗長。通過將部分邏輯抽取到外部函數中,可以使TransferTx
函數更簡潔、易讀。addMoney
函數,我們可以將這些重複的邏輯合併起來,減少代碼的複雜性。addMoney
函數即可,無需在多個地方進行修改,降低了維護成本。為何相同的順序下就能解決deadlock
?
什麼是 naked return
?
(account1 Account, account2 Account, err error)
。因此,當在函式體內遇到 return
語句時,該函式將返回 account1
、account2
和 err
變量的當前值。func addMoney(ctx context.Context,
q *Queries,
accountID1 int64,
amount1 int64,
accountID2 int64,
amount2 int64,
) (account1 Account, account2 Account, err error) {
account1, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID1,
Amount: amount1,
})
if err != nil {
return
}
account2, err = q.AddAccountBalance(ctx, AddAccountBalanceParams{
ID: accountID2,
Amount: amount2,
})
return
}